package command

import (
	"fmt"
	_ "github.com/logoove/sqlite"
	"os"
	_ "path"
	"redis-check/client"
	"redis-check/common"
	"redis-check/conf"
	"redis-check/tool"
	"strconv"
	"sync"
	"time"
)

type TestOptions struct {
	common.BaseOptions
	Source   conf.RedisArgs `group:"source" description:"Redis配置信息"`
	Commands string         `long:"cmd" default:"get,set" description:"执行的命令，多个逗号分隔，可选值[get,set,hget,hset,zadd,sadd,lpush]"`
	Name     string         `long:"name" default:"redis-test" description:"测试的名称，会添加到随机生成Key的前面"`
	KeyLen   int            `long:"key" default:"0" min:"0" max:"1000" description:"随机Key的长度，有效范围：[0, 1000]"`
	ValueLen int            `long:"value" default:"100" min:"1" max:"10000" description:"值的长度，有效范围：[1,10000]"`
	ReqNum   int            `long:"requests" default:"100" min:"1" max:"1000000" description:"命令请求数量，默认值：10，有效范围：[1, 1000000]"`
	Qps      int            `long:"qps" default:"10000" min:"1" max:"1000000" description:"每秒请求数量，默认值：10000，有效范围：[1, 1000000]"`
	Threads  int            `long:"threads" default:"1" min:"1" max:"100" description:"并发连接数量，默认：1，有效范围：[1,100]"`
	ExecTime int            `long:"time" default:"0" min:"0" max:"3600" description:"执行时间，大于0时生效，单位：秒，有效范围：[0,3600]"`
	Clean    bool           `long:"clean" description:"是否清除测试数据，默认：false"`
}

type TestCommand struct {
	Options    *TestOptions
	SourceHost client.RedisHost
	Prefix     string
}

type TestValue struct {
	key   []byte
	value []byte
	index int
}

func (p *TestCommand) Execute(args []string) error {
	ret, err := common.RunWithArgs(p.Options, os.Args[2:])
	if !ret {
		return err
	}
	p.SourceHost = p.Options.Source.ToRedisHost("source", true)
	p.Prefix = p.Options.Name
	if p.Prefix == "" {
		p.Prefix = "redis-test"
	}
	common.Logger.Infof("开始测试，Key前缀：%v", p.Prefix)
	for _, cmd := range common.ToArray(p.Options.Commands) {
		p.handleTest(cmd)
	}
	if p.Options.Clean {
		p.cleanData()
	}
	common.Logger.Infof("--------------- 测试完成 ----------------")
	return nil
}

func (p *TestCommand) handleTest(command string) {
	if "set" == command {
		p.testCommand(command, false, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.Set(&common.Key{Key: key}, value)
		})
	} else if "get" == command {
		p.testCommand(command, true, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.Get(&common.Key{Key: key})
		})
	} else if "hset" == command {
		p.testCommand(command, false, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.HashSet(&common.Key{Key: key}, []interface{}{[]byte(common.RandomStr(10) + strconv.Itoa(i)), value})
		})
	} else if "hget" == command {
		p.testCommand(command, true, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.HashGet(&common.Key{Key: key}, []byte(common.RandomStr(10)+strconv.Itoa(i)))
		})
	} else if "sadd" == command {
		p.testCommand(command, false, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.SetAdd(&common.Key{Key: key}, []interface{}{value})
		})
	} else if "zadd" == command {
		p.testCommand(command, false, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.ZsetAdd(&common.Key{Key: key}, []interface{}{value})
		})
	} else if "lpush" == command {
		p.testCommand(command, false, func(client *client.RedisClient, key []byte, i int, value []byte) (interface{}, error) {
			return client.ListPush(&common.Key{Key: key}, []interface{}{value})
		})
	} else {
		fmt.Fprintf(os.Stdout, "测试命令[%s]不支持\n", command)
	}

}

func (p *TestCommand) testCommand(command string, readMode bool, executor func(*client.RedisClient, []byte, int, []byte) (interface{}, error)) {
	queue := make(chan TestValue, 1000)
	counter := &common.Counter{Async: true}
	var wg sync.WaitGroup
	wg.Add(p.Options.Threads)
	for i := 0; i < p.Options.Threads; i++ {
		go func() {
			defer wg.Done()
			redisClient := tool.RedisConnect(p.SourceHost)
			defer redisClient.Close()
			for item := range queue {
				_, err := executor(redisClient, item.key, item.index, item.value)
				if err != nil {
					counter.Err(1, err.Error())
				} else {
					counter.Add(1)
				}
			}
		}()
	}
	counter.Init("测试命令[" + command + "]")
	begin := time.Now().UnixNano() / 1e6
	p.produceData(queue, readMode, p.Options.ExecTime)
	wg.Wait()
	end := time.Now().UnixNano() / 1e6
	counter.Finish("测试命令[%s]执行完成，执行数量：%d，连接数：%d，总耗时：%d，平均耗时：%d", command, counter.Count(), p.Options.Threads, end-begin, (end-begin)/counter.Count())
}

func (p *TestCommand) produceData(queue chan TestValue, readMode bool, seconds int) {
	var endTime int64
	if seconds > 0 {
		endTime = time.Now().Add(time.Second * time.Duration(seconds)).UnixNano()
	}
	qos := common.StartQoS(p.Options.Qps)
	defer qos.Close()
	for i := 0; i < p.Options.ReqNum || (endTime > 0 && endTime > time.Now().UnixNano()); i++ {
		<-qos.Bucket
		key := []byte(p.generateKey())
		var value []byte
		if !readMode {
			value = []byte(common.RandomStr(p.Options.ValueLen))
		}
		queue <- TestValue{
			key:   key,
			value: value,
			index: i,
		}
	}
	close(queue)
}

func (p *TestCommand) generateKey() string {
	if p.Options.KeyLen > 0 {
		return p.Prefix + ":" + common.RandomStr(p.Options.KeyLen)
	}
	return p.Prefix
}

func (p *TestCommand) cleanData() {
	common.Logger.Infof("开始清理测试数据，Key前缀：%v", p.Prefix)
	redisClient := tool.RedisConnect(p.SourceHost)
	defer redisClient.Close()
	cleanCount, _ := redisClient.ScanKeys(func(keys []*common.Key) (interface{}, error) {
		return redisClient.PipeDelCommand(keys)
	}, client.ScanOpts{Match: p.Prefix + "*", Count: 1000})
	common.Logger.Infof("清理测试数据完成，清理数量：%v", cleanCount)
}
